% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_replicator_filters).

-export([
    parse/1,
    fetch/3,
    view_type/2,
    ejsort/1
]).

-include_lib("couch/include/couch_db.hrl").

% Parse the filter from replication options proplist.
% Return {ok, {FilterType,...}} | {error, ParseError}.
% For `user` filter, i.e. filters specified as user code
% in source database, this code doesn't fetch the filter
% code, but only returns the name of the filter.
-spec parse([_]) ->
    {ok, nil}
    | {ok, {view, binary(), {[_]}}}
    | {ok, {user, {binary(), binary()}, {[_]}}}
    | {ok, {docids, [_]}}
    | {ok, {mango, {[_]}}}
    | {error, binary()}.
parse(Options) ->
    Filter = couch_util:get_value(filter, Options),
    DocIds = couch_util:get_value(doc_ids, Options),
    Selector = couch_util:get_value(selector, Options),
    case {Filter, DocIds, Selector} of
        {undefined, undefined, undefined} ->
            {ok, nil};
        {<<"_", _/binary>>, undefined, undefined} ->
            {ok, {view, Filter, query_params(Options)}};
        {_, undefined, undefined} ->
            case parse_user_filter(Filter) of
                {ok, {Doc, FilterName}} ->
                    {ok, {user, {Doc, FilterName}, query_params(Options)}};
                {error, Error} ->
                    {error, Error}
            end;
        {undefined, _, undefined} ->
            {ok, {docids, DocIds}};
        {undefined, undefined, _} ->
            {ok, {mango, ejsort(mango_selector:normalize(Selector))}};
        _ ->
            Err = "`selector`, `filter` and `doc_ids` are mutually exclusive",
            {error, list_to_binary(Err)}
    end.

% Fetches body of filter function from source database. Guaranteed to either
% return {ok, Body} or an {error, Reason}. Also assume this function might
% block due to network / socket issues for an undeterminted amount of time.
-spec fetch(binary(), binary(), binary()) ->
    {ok, {[_]}} | {error, binary()}.
fetch(DDocName, FilterName, Source) ->
    {Pid, Ref} = spawn_monitor(fun() ->
        try fetch_internal(DDocName, FilterName, Source) of
            Resp ->
                exit({exit_ok, Resp})
        catch
            throw:{fetch_error, Reason} ->
                exit({exit_fetch_error, Reason});
            _OtherTag:Reason ->
                exit({exit_other_error, Reason})
        end
    end),
    receive
        {'DOWN', Ref, process, Pid, {exit_ok, Resp}} ->
            {ok, Resp};
        {'DOWN', Ref, process, Pid, {exit_fetch_error, Reason}} ->
            {error, Reason};
        {'DOWN', Ref, process, Pid, {exit_other_error, Reason}} ->
            {error, couch_util:to_binary(Reason)}
    end.

% Get replication type and view (if any) from replication document props
-spec view_type([_], [_]) ->
    {view, {binary(), binary()}} | {db, nil} | {error, binary()}.
view_type(Props, Options) ->
    case couch_util:get_value(<<"filter">>, Props) of
        <<"_view">> ->
            {QP} = couch_util:get_value(query_params, Options, {[]}),
            ViewParam = couch_util:get_value(<<"view">>, QP),
            case re:split(ViewParam, <<"/">>) of
                [DName, ViewName] ->
                    {view, {<<"_design/", DName/binary>>, ViewName}};
                _ ->
                    {error, <<"Invalid `view` parameter.">>}
            end;
        _ ->
            {db, nil}
    end.

% Private functions

fetch_internal(DDocName, FilterName, Source) ->
    Db =
        case (catch couch_replicator_api_wrap:db_open(Source)) of
            {ok, Db0} ->
                Db0;
            DbError ->
                DbErrorMsg = io_lib:format(
                    "Could not open source database `~s`: ~s",
                    [
                        couch_replicator_api_wrap:db_uri(Source),
                        couch_util:to_binary(DbError)
                    ]
                ),
                throw({fetch_error, iolist_to_binary(DbErrorMsg)})
        end,
    try
        Body =
            case
                (catch couch_replicator_api_wrap:open_doc(
                    Db, <<"_design/", DDocName/binary>>, [ejson_body]
                ))
            of
                {ok, #doc{body = Body0}} ->
                    Body0;
                DocError ->
                    DocErrorMsg = io_lib:format(
                        "Couldn't open document `_design/~s` from source "
                        "database `~s`: ~s",
                        [
                            DDocName,
                            couch_replicator_api_wrap:db_uri(Source),
                            couch_util:to_binary(DocError)
                        ]
                    ),
                    throw({fetch_error, iolist_to_binary(DocErrorMsg)})
            end,
        try
            Code = couch_util:get_nested_json_value(
                Body, [<<"filters">>, FilterName]
            ),
            re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}])
        catch
            _Tag:CodeError ->
                CodeErrorMsg = io_lib:format(
                    "Couldn't parse filter code from document ~s on `~s` "
                    " Error: ~s",
                    [
                        DDocName,
                        couch_replicator_api_wrap:db_uri(Source),
                        couch_util:to_binary(CodeError)
                    ]
                ),
                throw({fetch_error, CodeErrorMsg})
        end
    after
        couch_replicator_api_wrap:db_close(Db)
    end.

-spec query_params([_]) -> {[_]}.
query_params(Options) ->
    couch_util:get_value(query_params, Options, {[]}).

parse_user_filter(Filter) ->
    case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
        {match, [DDocName0, FilterName0]} ->
            {ok, {DDocName0, FilterName0}};
        _ ->
            {error, <<"Invalid filter. Must match `ddocname/filtername`.">>}
    end.

% Sort an EJSON object's properties to attempt
% to generate a unique representation. This is used
% to reduce the chance of getting different
% replication checkpoints for the same Mango selector
ejsort({V}) ->
    ejsort_props(V, []);
ejsort(V) when is_list(V) ->
    ejsort_array(V, []);
ejsort(V) ->
    V.

ejsort_props([], Acc) ->
    {lists:keysort(1, Acc)};
ejsort_props([{K, V} | R], Acc) ->
    ejsort_props(R, [{K, ejsort(V)} | Acc]).

ejsort_array([], Acc) ->
    lists:reverse(Acc);
ejsort_array([V | R], Acc) ->
    ejsort_array(R, [ejsort(V) | Acc]).

-ifdef(TEST).

-include_lib("eunit/include/eunit.hrl").

ejsort_basic_values_test() ->
    ?assertEqual(ejsort(0), 0),
    ?assertEqual(ejsort(<<"a">>), <<"a">>),
    ?assertEqual(ejsort(true), true),
    ?assertEqual(ejsort([]), []),
    ?assertEqual(ejsort({[]}), {[]}).

ejsort_compound_values_test() ->
    ?assertEqual(ejsort([2, 1, 3, <<"a">>]), [2, 1, 3, <<"a">>]),
    Ej1 = {[{<<"a">>, 0}, {<<"c">>, 0}, {<<"b">>, 0}]},
    Ej1s = {[{<<"a">>, 0}, {<<"b">>, 0}, {<<"c">>, 0}]},
    ?assertEqual(ejsort(Ej1), Ej1s),
    Ej2 = {[{<<"x">>, Ej1}, {<<"z">>, Ej1}, {<<"y">>, [Ej1, Ej1]}]},
    ?assertEqual(
        ejsort(Ej2),
        {[{<<"x">>, Ej1s}, {<<"y">>, [Ej1s, Ej1s]}, {<<"z">>, Ej1s}]}
    ).

-endif.
